package com.ruyuan.eshop.push.job.handler;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.base.Splitter;
import com.ruyuan.eshop.common.concurrent.SafeThreadPool;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.enums.YesOrNoEnum;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformHotProductUserBucketMessage;
import com.ruyuan.eshop.common.utils.JsonUtil;
import com.ruyuan.eshop.membership.api.MembershipPointApi;
import com.ruyuan.eshop.membership.domain.dto.MembershipFilterDTO;
import com.ruyuan.eshop.membership.domain.dto.MembershipFilterConditionDTO;
import com.ruyuan.eshop.membership.domain.dto.MembershipPointDTO;
import com.ruyuan.eshop.persona.api.PersonaApi;
import com.ruyuan.eshop.persona.domain.dto.PersonaFilterConditionDTO;
import com.ruyuan.eshop.push.converter.ConditionConverter;
import com.ruyuan.eshop.push.dao.HotGoodsCrontabDAO;
import com.ruyuan.eshop.push.dao.MessagePushCrontabDAO;
import com.ruyuan.eshop.push.domain.dto.PushMessageDTO;
import com.ruyuan.eshop.push.domain.entity.HotGoodsCrontabDO;
import com.ruyuan.eshop.push.domain.entity.MessagePushCrontabDO;
import com.ruyuan.eshop.push.domain.vo.HotGoodsVO;
import com.ruyuan.eshop.push.mq.producer.DefaultProducer;
import com.ruyuan.eshop.push.service.MessagePushService;
import com.ruyuan.eshop.push.splitter.ListSplitter;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ruyuan.eshop.common.constants.BatchSizeConstant.MESSAGE_BATCH_SIZE;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class ScheduleSendMessageJobHandler {

    @Resource
    private HotGoodsCrontabDAO hotGoodsCrontabDAO;

    @Resource
    private MessagePushCrontabDAO messagePushCrontabDAO;

    @DubboReference(version = "1.0.0")
    private MembershipPointApi membershipPointApi;

    @Autowired
    private MessagePushService messagePushService;

    /**
     * 用户画像服务
     */
    @DubboReference(version = "1.0.0")
    private PersonaApi personaApi;

    /**
     * 条件转换器
     */
    @Autowired
    private ConditionConverter conditionConverter;
    /**
     * mq生产者
     */
    @Resource
    private DefaultProducer defaultProducer;

    /**
     * 公用的消息推送线程池
     */
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    /**
     * 执行定时任务，筛选热门商品和用户发送给MQ
     */
    @XxlJob("hotGoodsPushHandler")
    public void hotGoodsPushHandler() {
        log.info("hotGoodsPushHandler 开始执行");

        // 获取热门商品和用户画像，业务先简化为一对一关系
        List<HotGoodsCrontabDO> crontabDOs = hotGoodsCrontabDAO.queryHotGoodsCrontabByCrontabDate(new Date());
        log.info("获取热门商品和用户画像数据, crontabDOs:{}", JsonUtil.object2Json(crontabDOs));

        // 找出每个热门商品对应画像所匹配的用户
        for (HotGoodsCrontabDO crontabDO : crontabDOs) {
            log.info("自动分片逻辑, 当前任务：crontabDO:{}", JsonUtil.object2Json(crontabDO));
            if (StringUtils.isEmpty(crontabDO.getPortrayal())) {
                continue;
            }

            // 热门商品对应的画像实体
            MembershipPointDTO membershipPointDTO = JsonUtil.json2Object(crontabDO.getPortrayal(), MembershipPointDTO.class);

            if (Objects.isNull(membershipPointDTO)) {
                continue;
            }

            // 获取匹配画像的用户积分实体
            MembershipFilterConditionDTO conditionDTO = buildCondition(membershipPointDTO);

            PersonaFilterConditionDTO personaFilterConditionDTO = conditionConverter
                    .convertFilterCondition(conditionDTO);
            // 获取画像用户匹配的用户id最大最小值
            log.info("用户查询条件：{}",personaFilterConditionDTO);
            JsonResult<Long> accountMaxIdResult = personaApi.queryMaxIdByCondition(personaFilterConditionDTO);
            log.info("获取最大id，result:{}", JsonUtil.object2Json(accountMaxIdResult));
            if (!accountMaxIdResult.getSuccess()) {
                log.info("获取最大id失败，condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
                throw new BaseBizException(accountMaxIdResult.getErrorCode(), accountMaxIdResult.getErrorMessage());
            }
            JsonResult<Long> accountMinIdResult = personaApi.queryMinIdByCondition(personaFilterConditionDTO);
            log.info("获取最小id，result:{}", JsonUtil.object2Json(accountMinIdResult));
            if (!accountMinIdResult.getSuccess()) {
                log.info("获取最小id失败，condition:{}", JsonUtil.object2Json(personaFilterConditionDTO));
                throw new BaseBizException(accountMinIdResult.getErrorCode(), accountMinIdResult.getErrorMessage());
            }

            // 需要执行推送的用户起始id
            // 注意：这是一个预估值，因为最小id 到 最大id中间会有很多不符合条件的用户
            // 针对这些用户，需要在下一层的业务逻辑中，用选人条件过滤掉
            Long minUserId = accountMinIdResult.getData();
            Long maxUserId = accountMaxIdResult.getData();

            // bucket，就是一个用户分片，对应的是一个startUserId->endUserId，用户id范围
            final int userBucketSize = 1000;
            // messageBatchSize，消息batch大小的，rocketmq的每个batch消息包含了100个推送任务
            // 1w个推送任务，会合并为100个batch，主要进行100次网络通信给rocketmq就可以了，大幅度降低发送消息的耗时
            // 就可以根据一定的算法，把千万级用户推送任务分片，比如一个推送任务包含1000个用户/2000个用户
            // 上万条key-value对，每个key-value对就是一个startUserId->endUserId，推送任务分片
            Map<Long, Long> userBuckets = new LinkedHashMap<>();
            AtomicBoolean doSharding = new AtomicBoolean(true);
            long startUserId = minUserId;
            log.info("开始对任务人群进行分片，startId:{}",minUserId);
            while (doSharding.get()) {
                if ((maxUserId -minUserId) < userBucketSize) {
                    userBuckets.put(startUserId, maxUserId);
                    doSharding.compareAndSet(true, false);
                    break;
                }
                userBuckets.put(startUserId, startUserId + userBucketSize);
                startUserId += userBucketSize;
                maxUserId -= userBucketSize;
            }

            // 把可能成千上万的推送任务进行rocketmq消息的batch合并，以batch模式一批一批的发送任务到mq里去，跟rocketmq网络通信的耗时就少了
            List<String> hotProductPushTasks = new ArrayList<>();
            HotGoodsVO hotGoodsVO = buildHotGoodsVO(crontabDO);
            PlatformHotProductUserBucketMessage bucketMessage = PlatformHotProductUserBucketMessage.builder()
                    .hotGoodsVO(JSON.toJSONString(hotGoodsVO))
                    .personaFilterConditionDTO(JSON.toJSONString(personaFilterConditionDTO))
                    .build();
            for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
                bucketMessage.setEndUserId(userBucket.getValue());
                bucketMessage.setStartUserId(userBucket.getKey());

                String promotionPushTaskJSON = JsonUtil.object2Json(bucketMessage);
                log.info("用户桶构建侧选人条件：{}",bucketMessage.getPersonaFilterConditionDTO());
                hotProductPushTasks.add(promotionPushTaskJSON);
            }
            ListSplitter splitter = new ListSplitter(hotProductPushTasks, MESSAGE_BATCH_SIZE);
            while(splitter.hasNext()){
                List<String> sendBatch = splitter.next();
                log.info("本次批次消息数量,{}",sendBatch.size());
                sharedSendMsgThreadPool.execute(() -> {
                    defaultProducer.sendMessages(RocketMqConstant.PLATFORM_HOT_PRODUCT_USER_BUCKET_SEND_TOPIC,
                            sendBatch, "平台热门商品定时任务用户桶消息");
                });
            }
        }
    }

    @SuppressWarnings("UnstableApiUsage")
    private HotGoodsVO buildHotGoodsVO(HotGoodsCrontabDO crontabDO) {
        List<String> keywords = Splitter.on(",")
                .splitToList(crontabDO.getKeywords());

        HotGoodsVO hotGoodsVO = new HotGoodsVO();
        hotGoodsVO.setGoodsName(crontabDO.getGoodsName());
        hotGoodsVO.setGoodsDesc(crontabDO.getGoodsDesc());
        hotGoodsVO.setKeyWords(keywords);

        return hotGoodsVO;
    }

    private MembershipFilterConditionDTO buildCondition(MembershipPointDTO membershipPointDTO) {
        return MembershipFilterConditionDTO.builder()
                .memberLevel(membershipPointDTO.getMemberLevel())
                .memberPoint(membershipPointDTO.getMemberPoint())
                .build();
    }

    /**
     * 执行定时任务，筛选并执行 需要定时发送消息的优惠券等活动消息 的任务
     */
    @XxlJob("messagePushHandler")
    public void messagePushHandler() {
        log.info("messagePushHandler 开始执行");

        // 查询出当前时间需要执行的任务，小于等于当前时间且没有执行过的任务
        LambdaQueryWrapper<MessagePushCrontabDO> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.le(MessagePushCrontabDO::getCrontabTime, new Date())
                .eq(MessagePushCrontabDO::getExecuteFlag, YesOrNoEnum.NO.getCode());
        List<MessagePushCrontabDO> messagePushCrontabDOS = messagePushCrontabDAO.list(queryWrapper);

        log.info("获取需要定时发送消息的优惠券等活动消息任务, messagePushCrontabDOS:{}", JsonUtil.object2Json(messagePushCrontabDOS));

        for (MessagePushCrontabDO messagePushCrontabDO : messagePushCrontabDOS) {
            sendMessage(messagePushCrontabDO);
            // 消息发送成功，将数据变更为已执行
            messagePushCrontabDO.setExecuteFlag(YesOrNoEnum.YES.getCode());
        }
        // 已经执行过的任务，修改数据库
        messagePushCrontabDAO.updateBatchById(messagePushCrontabDOS);
        log.info("hotGoodsPushHandler 执行结束");
    }

    private void sendMessage(MessagePushCrontabDO messagePushCrontabDO) {
        MembershipFilterDTO membershipFilterDTO =
                JsonUtil.json2Object(messagePushCrontabDO.getFilterCondition(), MembershipFilterDTO.class);

        PushMessageDTO pushMessageDTO = PushMessageDTO.builder()
                .mainMessage(messagePushCrontabDO.getMainMessage())
                .message(messagePushCrontabDO.getMessageInfo())
                .informType(messagePushCrontabDO.getInformType())
                .build();

        messagePushService.pushMessages(pushMessageDTO, membershipFilterDTO);
    }
}
